#!/usr/bin/env python3
# coding: utf-8
from __future__ import print_function, unicode_literals

import json
import os
import shutil
import tempfile
import unittest

from copyparty import util
from copyparty.authsrv import VFS, AuthSrv
from tests import util as tu
from tests.util import Cfg


class TestVFS(unittest.TestCase):
    def setUp(self):
        self.td = tu.get_ramdisk()

    def tearDown(self):
        os.chdir(tempfile.gettempdir())
        shutil.rmtree(self.td)

    def dump(self, vfs):
        print(json.dumps(vfs, indent=4, sort_keys=True, default=lambda o: o.__dict__))

    def unfoo(self, foo):
        for k, v in {"foo": "a", "bar": "b", "baz": "c", "qux": "d"}.items():
            foo = foo.replace(k, v)

        return foo

    def undot(self, vfs, query, response):
        self.assertEqual(util.undot(query), response)
        query = self.unfoo(query)
        response = self.unfoo(response)
        self.assertEqual(util.undot(query), response)

    def ls(self, vfs, vpath, uname):
        # type: (VFS, str, str) -> tuple[str, str, str]
        """helper for resolving and listing a folder"""
        vn, rem = vfs.get(vpath, uname, True, False)
        r1 = vn.ls(rem, uname, False, [[True]])
        r2 = vn.ls(rem, uname, False, [[True]])
        self.assertEqual(r1, r2)

        fsdir, real, virt = r1
        real = [x[0] for x in real]
        return fsdir, real, virt

    def log(self, src, msg, c=0):
        pass

    def assertAxs(self, dct, lst):
        t1 = list(sorted(dct))
        t2 = list(sorted(lst))
        self.assertEqual(t1, t2)

    def wipe_vfs(self, td):
        os.chdir("..")
        if os.path.exists(td):
            shutil.rmtree(td)
        os.mkdir(td)
        os.chdir(td)

        for a in "abc":
            for b in "abc":
                for c in "abc":
                    folder = "{0}/{0}{1}/{0}{1}{2}".format(a, b, c)
                    os.makedirs(folder)
                    for d in "abc":
                        fn = "{}/{}{}{}{}".format(folder, a, b, c, d)
                        with open(fn, "w") as f:
                            f.write(fn)

    def test(self):
        td = os.path.join(self.td, "vfs")
        self.wipe_vfs(td)

        # defaults
        self.wipe_vfs(td)
        vfs = AuthSrv(Cfg(), self.log).vfs
        self.assertEqual(vfs.nodes, {})
        self.assertEqual(vfs.vpath, "")
        self.assertEqual(vfs.realpath, td)
        self.assertAxs(vfs.axs.uread, ["*"])
        self.assertAxs(vfs.axs.uwrite, ["*"])

        # single read-only rootfs (relative path)
        self.wipe_vfs(td)
        vfs = AuthSrv(Cfg(v=["a/ab/::r"]), self.log).vfs
        self.assertEqual(vfs.nodes, {})
        self.assertEqual(vfs.vpath, "")
        self.assertEqual(vfs.realpath, os.path.join(td, "a", "ab"))
        self.assertAxs(vfs.axs.uread, ["*"])
        self.assertAxs(vfs.axs.uwrite, [])

        # single read-only rootfs (absolute path)
        self.wipe_vfs(td)
        vfs = AuthSrv(Cfg(v=[td + "//a/ac/../aa//::r"]), self.log).vfs
        self.assertEqual(vfs.nodes, {})
        self.assertEqual(vfs.vpath, "")
        self.assertEqual(vfs.realpath, os.path.join(td, "a", "aa"))
        self.assertAxs(vfs.axs.uread, ["*"])
        self.assertAxs(vfs.axs.uwrite, [])

        # read-only rootfs with write-only subdirectory (read-write for k)
        self.wipe_vfs(td)
        vfs = AuthSrv(
            Cfg(a=["k:k"], v=[".::r:rw,k", "a/ac/acb:a/ac/acb:w:rw,k"]),
            self.log,
        ).vfs
        self.assertEqual(len(vfs.nodes), 1)
        self.assertEqual(vfs.vpath, "")
        self.assertEqual(vfs.realpath, td)
        self.assertAxs(vfs.axs.uread, ["*", "k"])
        self.assertAxs(vfs.axs.uwrite, ["k"])
        n = vfs.nodes["a"]
        self.assertEqual(len(vfs.nodes), 1)
        self.assertEqual(n.vpath, "a")
        self.assertEqual(n.realpath, os.path.join(td, "a"))
        self.assertAxs(n.axs.uread, ["*", "k"])
        self.assertAxs(n.axs.uwrite, ["k"])
        n = n.nodes["ac"]
        self.assertEqual(len(vfs.nodes), 1)
        self.assertEqual(n.vpath, "a/ac")
        self.assertEqual(n.realpath, os.path.join(td, "a", "ac"))
        self.assertAxs(n.axs.uread, ["*", "k"])
        self.assertAxs(n.axs.uwrite, ["k"])
        n = n.nodes["acb"]
        self.assertEqual(n.nodes, {})
        self.assertEqual(n.vpath, "a/ac/acb")
        self.assertEqual(n.realpath, os.path.join(td, "a", "ac", "acb"))
        self.assertAxs(n.axs.uread, ["k"])
        self.assertAxs(n.axs.uwrite, ["*", "k"])

        # something funky about the windows path normalization,
        # doesn't really matter but makes the test messy, TODO?

        fsdir, real, virt = self.ls(vfs, "/", "*")
        self.assertEqual(fsdir, td)
        self.assertEqual(real, ["b", "c"])
        self.assertEqual(list(virt), ["a"])

        fsdir, real, virt = self.ls(vfs, "a", "*")
        self.assertEqual(fsdir, os.path.join(td, "a"))
        self.assertEqual(real, ["aa", "ab"])
        self.assertEqual(list(virt), ["ac"])

        fsdir, real, virt = self.ls(vfs, "a/ab", "*")
        self.assertEqual(fsdir, os.path.join(td, "a", "ab"))
        self.assertEqual(real, ["aba", "abb", "abc"])
        self.assertEqual(list(virt), [])

        fsdir, real, virt = self.ls(vfs, "a/ac", "*")
        self.assertEqual(fsdir, os.path.join(td, "a", "ac"))
        self.assertEqual(real, ["aca", "acc"])
        self.assertEqual(list(virt), [])

        fsdir, real, virt = self.ls(vfs, "a/ac", "k")
        self.assertEqual(fsdir, os.path.join(td, "a", "ac"))
        self.assertEqual(real, ["aca", "acc"])
        self.assertEqual(list(virt), ["acb"])

        self.assertRaises(util.Pebkac, vfs.get, "a/ac/acb", "*", True, False)

        fsdir, real, virt = self.ls(vfs, "a/ac/acb", "k")
        self.assertEqual(fsdir, os.path.join(td, "a", "ac", "acb"))
        self.assertEqual(real, ["acba", "acbb", "acbc"])
        self.assertEqual(list(virt), [])

        # admin-only rootfs with all-read-only subfolder
        self.wipe_vfs(td)
        vfs = AuthSrv(
            Cfg(a=["k:k"], v=[".::rw,k", "a:a:r"]),
            self.log,
        ).vfs
        self.assertEqual(len(vfs.nodes), 1)
        self.assertEqual(vfs.vpath, "")
        self.assertEqual(vfs.realpath, td)
        self.assertAxs(vfs.axs.uread, ["k"])
        self.assertAxs(vfs.axs.uwrite, ["k"])
        n = vfs.nodes["a"]
        self.assertEqual(len(vfs.nodes), 1)
        self.assertEqual(n.vpath, "a")
        self.assertEqual(n.realpath, os.path.join(td, "a"))
        self.assertAxs(n.axs.uread, ["*", "k"])
        self.assertAxs(n.axs.uwrite, [])
        perm_na = (False, False, False, False, False, False, False, False)
        perm_rw = (True, True, False, False, False, False, False, False)
        perm_ro = (True, False, False, False, False, False, False, False)
        self.assertEqual(vfs.can_access("/", "*"), perm_na)
        self.assertEqual(vfs.can_access("/", "k"), perm_rw)
        self.assertEqual(vfs.can_access("/a", "*"), perm_ro)
        self.assertEqual(vfs.can_access("/a", "k"), perm_ro)

        # breadth-first construction
        self.wipe_vfs(td)
        vfs = AuthSrv(
            Cfg(
                v=[
                    "a/ac/acb:a/ac/acb:w",
                    "a:a:w",
                    ".::r",
                    "abacdfasdq:abacdfasdq:w",
                    "a/ac:a/ac:w",
                ],
            ),
            self.log,
        ).vfs

        # sanitizing relative paths
        self.undot(vfs, "foo/bar/../baz/qux", "foo/baz/qux")
        self.undot(vfs, "foo/../bar", "bar")
        self.undot(vfs, "foo/../../bar", "bar")
        self.undot(vfs, "foo/../../", "")
        self.undot(vfs, "./.././foo/", "foo")
        self.undot(vfs, "./.././foo/..", "")

        # shadowing
        self.wipe_vfs(td)
        vfs = AuthSrv(Cfg(v=[".::r", "b:a/ac:r"]), self.log).vfs

        fsp, r1, v1 = self.ls(vfs, "", "*")
        self.assertEqual(fsp, td)
        self.assertEqual(r1, ["b", "c"])
        self.assertEqual(list(v1), ["a"])

        fsp, r1, v1 = self.ls(vfs, "a", "*")
        self.assertEqual(fsp, os.path.join(td, "a"))
        self.assertEqual(r1, ["aa", "ab"])
        self.assertEqual(list(v1), ["ac"])

        fsp1, r1, v1 = self.ls(vfs, "a/ac", "*")
        fsp2, r2, v2 = self.ls(vfs, "b", "*")
        self.assertEqual(fsp1, os.path.join(td, "b"))
        self.assertEqual(fsp2, os.path.join(td, "b"))
        self.assertEqual(r1, ["ba", "bb", "bc"])
        self.assertEqual(r1, r2)
        self.assertEqual(list(v1), list(v2))

        # config file parser
        cfg_path = os.path.join(self.td, "test.cfg")
        with open(cfg_path, "wb") as f:
            f.write(
                util.dedent(
                    """
                    u a:123
                    u asd:fgh:jkl

                    ./src
                    /dst
                    r a
                    rw asd
                    """
                ).encode("utf-8")
            )

        self.wipe_vfs(td)
        au = AuthSrv(Cfg(c=[cfg_path]), self.log)
        self.assertEqual(au.acct["a"], "123")
        self.assertEqual(au.acct["asd"], "fgh:jkl")
        n = au.vfs
        # root was not defined, so PWD with no access to anyone
        self.assertEqual(n.vpath, "")
        self.assertEqual(n.realpath, "")
        self.assertAxs(n.axs.uread, [])
        self.assertAxs(n.axs.uwrite, [])
        self.assertEqual(len(n.nodes), 1)
        n = n.nodes["dst"]
        self.assertEqual(n.vpath, "dst")
        self.assertEqual(n.realpath, os.path.join(td, "src"))
        self.assertAxs(n.axs.uread, ["a", "asd"])
        self.assertAxs(n.axs.uwrite, ["asd"])
        self.assertEqual(len(n.nodes), 0)

        os.unlink(cfg_path)
